package com.unknown.chainevent.biz.event;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.cron.CronUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.unknown.chainevent.biz.service.ChainConfigService;
import com.unknown.chainevent.biz.service.ContractConfigService;
import com.unknown.chainevent.biz.service.ContractLogsService;
import com.unknown.chainevent.common.constants.ChainEventConstants;
import com.unknown.chainevent.common.entity.ChainConfig;
import com.unknown.chainevent.common.entity.ContractConfig;
import com.unknown.chainevent.common.entity.ContractLogs;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.cloud.utils.CollectionUtil;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.EthLog;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.http.HttpService;

/**
 * 合约事件日志扫描业务
 *
 * @author Administrator
 */
@Slf4j
@NoArgsConstructor
public abstract class BaseLogFilter {

    private final AtomicBoolean STOP = new AtomicBoolean(true);
    public final AtomicBoolean RE_INIT = new AtomicBoolean(false);
    //每次扫块时间间隔（毫秒）
    @Setter
    protected long lastBlock;
    @Setter
    private Web3j web3j;
    @Setter
    private ChainConfigService chainConfigService;
    @Setter
    private ContractConfigService contractConfigService;
    @Setter
    private ContractLogsService contractLogsService;
    @Setter
    private ChainConfig chainConfig;
    //要监听的合约地址
    @Setter
    private List<String> contractAddressList;
    @Setter
    private Map<String, ContractConfig> contractConfigMap;
    @Getter
    private String taskId;

    public BaseLogFilter(ChainConfigService chainConfigService, ContractConfigService contractConfigService,
        ContractLogsService contractLogsService, ChainConfig chainConfig) throws IOException {
        this.chainConfigService = chainConfigService;
        this.contractConfigService = contractConfigService;
        this.contractLogsService = contractLogsService;
        this.chainConfig = chainConfig;
        Assert.isTrue(!ObjectUtils.isEmpty(chainConfig.getRpcAddress()), chainConfig.getChainSymbol() + "RPC地址不能为空");
        this.web3j = Web3j.build(new HttpService(chainConfig.getRpcAddress()));
        this.taskId = String.format("chain_task_%s", chainConfig.getChainConfigId());
        initContract();
        BigInteger chainId = this.web3j.ethChainId().send().getChainId();
        if (chainId.longValue() != (chainConfig.getChainId())) {
            log.warn("{} 链ID和RPC地址不匹配", chainConfig.getChainSymbol());
            throw new RuntimeException(chainConfig.getChainSymbol() + "链ID和RPC地址不匹配");
        }
    }

    public void initContract() {
        List<ContractConfig> contractConfigList = this.contractConfigService.list(
            Wrappers.lambdaQuery(ContractConfig.class).eq(ContractConfig::getChainConfigId, this.chainConfig.getChainConfigId()));
        if (!ObjectUtils.isEmpty(contractConfigList)) {
            contractConfigMap = contractConfigList.stream()
                .filter(_v -> !ObjectUtils.isEmpty(_v.getContractAddress())
                    && _v.getContractStatus() == ChainEventConstants.ContractStatusConstants.NORMAL.getStatus())
                .collect(Collectors.toMap(_v -> _v.getContractAddress().toLowerCase(), Function.identity()));   //合约地址切记转小写，方便后边匹配
            if (!ObjectUtils.isEmpty(contractConfigMap)) {
                contractAddressList = new ArrayList<>(contractConfigMap.keySet());
            }
        }
    }

    public void initWeb3() {
        this.web3j = Web3j.build(new HttpService(chainConfig.getRpcAddress()));
    }

    /**
     * 开始监听事件
     */
    public void startTask() {
        lastBlock = chainConfig.getBlockNumber();
        try {
            CronUtil.schedule(this.taskId, "0/3 * * * * ?", () -> {
                if (ObjectUtils.isEmpty(contractAddressList)) {
                    log.info("{}[{}],事件监听任务，未找到合约配置", this.taskId, chainConfig.getRpcAddress());
                    return;
                }
                if (STOP.get()) {
                    log.info("{}[{}],事件监听任务已经停止", this.taskId, chainConfig.getRpcAddress());
                    return;
                }
                TimeInterval timer = DateUtil.timer();
                log.info("{},[{}],区块高度[{}],事件监听任务执行中...", this.taskId, chainConfig.getRpcAddress(), lastBlock);
                try {
                    runScan();
                } catch (Exception e) {
                    log.error("{}[{}],事件监听执行异常中断 - [error: {}]", this.taskId, chainConfig.getRpcAddress(), e.getMessage(), e);
                }
                log.info("{}[{}],事件监听任务完成，耗时：{}ms", this.taskId, chainConfig.getRpcAddress(), timer.interval());
            });
            // 启动定时任务
            CronUtil.setMatchSecond(true);
            CronUtil.start();
        } catch (Exception e) {
            log.error("{}[{}],事件监听任务启动异常 - [error: {}]", this.taskId, chainConfig.getRpcAddress(), e.getMessage(), e);
            this.stop();
        }
    }

    /**
     * 执行扫块监听事件
     */
    protected void runScan() throws IOException, ExecutionException, InterruptedException {
        long currentBlockNumber = lastBlock;
        long startBlock = currentBlockNumber + 1;
        BigInteger latestHeight = web3j.ethBlockNumber().sendAsync().get().getBlockNumber();
        //当前最新区块暂时不处理(减去不信任区块)
        BigInteger useLatestHeight = latestHeight.subtract(BigInteger.valueOf(chainConfig.getDistrustBlock()));
        if (useLatestHeight.longValue() <= currentBlockNumber) {
            return;
        }
        //按配置往前补偿扫描一些区块
        startBlock = startBlock - chainConfig.getScanOldBlock();
        //最大查询5000区块
        BigInteger endBlock = BigInteger.valueOf(
            useLatestHeight.longValue() - startBlock > 500 ? startBlock + 500 : useLatestHeight.longValue());
//        BigInteger endBlock = NumberUtil.max(startBlock, useLatestHeight.longValue());
        if (chainConfig.getChainId() == 56 && startBlock <= 36387573 && 36387573 <= endBlock.longValue()) {
            log.info("区块调度{},{}", startBlock, endBlock);
        }
        EthFilter ethFilter = new EthFilter(DefaultBlockParameter.valueOf(BigInteger.valueOf(startBlock)),
            DefaultBlockParameter.valueOf(endBlock), contractAddressList);
        EthLog ethLog = web3j.ethGetLogs(ethFilter).send();
        Assert.isTrue(!ethLog.hasError(),
            "获取事件异常-" + (ethLog.getError() != null && ethLog.getError().getMessage() != null ? ethLog.getError().getMessage() : ""));
        List<EthLog.LogResult> logs = ethLog.getLogs();
        if (CollectionUtil.single().isNotEmpty(logs)) {
            saveEvent(logs, latestHeight.longValue());
        }
        updateLastBlock(endBlock.longValue());
    }

    protected void updateLastBlock(long lastBlock) {
        if (STOP.get()) {
            return;
        }
        this.lastBlock = lastBlock;
        log.info("更新区块调度[{}]", lastBlock);
        chainConfig.setBlockNumber(lastBlock);
        chainConfigService.updateLastBlock(chainConfig.getChainConfigId(), lastBlock);
    }

    /**
     * 保存事件
     *
     * @param logs
     * @throws Exception
     */
    protected void saveEvent(List<EthLog.LogResult> logs, long latestHeight) throws IOException {
        if (CollectionUtil.single().isEmpty(logs)) {
            return;
        }
        HashMap<String, EthBlock.Block> blockMap = new HashMap<>();
        ArrayList<ContractLogs> saveLogList = new ArrayList<>();
        for (EthLog.LogResult logResult : logs) {
            Log log = (Log) logResult.get();
            String contractAddress = log.getAddress();
            //判断是不是监听的日志
            if (!contractAddressList.contains(contractAddress)) {
                continue;
            }
            String transactionHash = log.getTransactionHash();
            Long logIndex = log.getLogIndex().longValue();
            if (!contractLogsService.existsByHashAndIndex(transactionHash, logIndex)) {
                EthBlock.Block block = blockMap.get(log.getBlockHash());
                if (block == null) {
                    block = web3j.ethGetBlockByHash(log.getBlockHash(), false).send().getBlock();
                    blockMap.put(log.getBlockHash(), block);
                }
                List<String> topics = log.getTopics();
                String data = log.getData();
                data = data.equals("0x") ? "0x0" : data;
                ContractLogs contractLogs = new ContractLogs();
                contractLogs.setChainConfigId(chainConfig.getChainConfigId());
                contractLogs.setChainSymbol(chainConfig.getChainSymbol());
                contractLogs.setContractConfigId(contractConfigMap.get(contractAddress).getContractConfigId());
                contractLogs.setContractAddress(contractAddress);
                contractLogs.setTransHash(transactionHash);
                contractLogs.setIndexForBlock(logIndex);
                contractLogs.setBlockNumber(block.getNumber().longValue());
                contractLogs.setBlockTimestamp(new Date(block.getTimestamp().longValue() * 1000));

                contractLogs.setFirstTopic(topics.get(0));
                if (topics.size() >= 2) {
                    contractLogs.setSecondTopic(topics.get(1));
                }
                if (topics.size() >= 3) {
                    contractLogs.setThirdTopic(topics.get(2));
                }
                if (topics.size() >= 4) {
                    contractLogs.setFourthTopic(topics.get(3));
                }
                contractLogs.setLogData(data);
                contractLogs.setStatus(ChainEventConstants.ContractLogStatus.UNTREATED.getStatus());
                contractLogs.setConfirmStatus(latestHeight - block.getNumber().longValue() >= chainConfig.getConfirmBlock()   //当前是否达到确认区块
                    ? ChainEventConstants.ContractLogConfirmStatus.SUCCESS.getStatus()
                    : ChainEventConstants.ContractLogConfirmStatus.AWAIT.getStatus());
                saveLogList.add(contractLogs);
            }
        }
        if (CollectionUtil.single().isNotEmpty(saveLogList)) {
            boolean b = contractLogsService.saveBatch(saveLogList);
            Assert.isTrue(b, chainConfig.getChainSymbol() + ": log保存失败");
        }

    }

    /**
     * 订阅是否关闭
     *
     * @return
     */
    public void start() {
        STOP.compareAndSet(true, false);
    }

    /**
     * 关闭订阅
     */
    public void stop() {
        STOP.compareAndSet(false, true); // 设置为false以停止任务
        log.info("任务 {} 已停止", taskId);
    }

    public boolean isStop() {
        return STOP.get();
    }
}
